分类
联系方式
  1. 新浪微博
  2. E-mail

DartVM MessageHandler

介绍

每个 Isolate 都有一个 message handler,处理消息对列上所有的传入消息。

该类位于 runtime/vm/message_handler.h。

类继承关系

MessageHandler 基类是一个抽象类,有多种实现类,具体关系如下:

三个类分别是:

  • MessageHandler
  • IsolateMessageHandler
  • NativeMessageHandler

MessageHandler 基类

成员

两条消息队列

消息队列位于 MessageHandler 内,作为成员,有两个:

MessageQueue* queue_;
MessageQueue* oob_queue_;

其中:

  • quque_ 是消息队列
  • oob_queue_ 是控制消息队列

线程池

MessageHandler 还持有了对线程池的引用:

ThreadPool* pool_;

声明周期回调

起停生命周期还有回调:

StartCallback start_callback_;
EndCallback end_callback_;
CallbackData callback_data_;

公有方法

Run

// 在线程池上运行 message handler

// 在处理消息之前,会先调用 StartFunction 回调
// message handler 开始运行直到结束
// 如果消息处理过程中遇到问题,message handler 会提前退出
void Run(ThreadPool* pool,
         StartCallback start_callback,
         EndCallback end_callback,
         CallbackData data);

handleNextMessage

有两个:

// 处理下一条消息
// 只能在 handler 没有在线程池上运行的时候调用
// 处理成功返回 true
MessageStatus HandleNextMessage();

// 处理下一条 OOB 处理下一条消息
// 只能在 handler 没有在线程池上运行的时候调用
// 处理成功返回 true
MessageStatus HandleOOBMessages();

暂停

MessageHandler 消息处理还支持暂停一段时间:

// 根据条件变量阻塞线程,直到有新消息到来,然后处理所有消息
MessageStatus PauseAndHandleAllMessages(int64_t timeout_millis);

消息类型查找

队列中是否存在某种类型(普通消息和 OOB 消息)的消息:

bool HasOOBMessages();
bool HasMessages();

活跃端口

MessageHandler 与端口有所关联:

// A message handler tracks how many live ports it has.
bool HasLivePorts() const { return live_ports_ > 0; }
intptr_t live_ports() const { return live_ports_; }

暂停计数

MessageHandler 对于暂停状态的维护,采用了引用计数的方式:

bool paused() const { return paused_ > 0; }
void increment_paused() { paused_++; }
void decrement_paused() {
  ASSERT(paused_ > 0);
  paused_--;
}

保护方法

发送消息

通过 PostMessage,可以向消息队列中发送一条消息:

// 向当前 handler 的消息队列发送消息
// 如果 before_events 为 true,将消息插到队头
void PostMessage(std::unique_ptr<Message> message,
                 bool before_events = false);

消息处理

消息处理方法,抽象方法,由子类给出实现。MessageHandler 对消息的解析处理,在该方法中进行:

// Handles a single message. Provided by subclass.
// Returns true on success.
virtual MessageStatus HandleMessage(std::unique_ptr<Message> message) = 0;

私有方法

消息取出

从消息队列中取出一条消息,OOB 消息的优先级高于普通消息:

std::unique_ptr<Message> DequeueMessage(Message::Priority min_priority);

清理 OOB 栈

OOB 栈是允许清理的:

void ClearOOBQueue();

构造场景

通过构造能够看出该类的使用场景,分别寻找这3个类的构造场景。

Isolate setter/getter

在 Isolate 的 C/C++ 层实现中,可以通过 setter/getter 设置 message handler:

MessageHandler* message_handler() const { return message_handler_; };

void set_message_handler(MessageHandler* value) { message_handler_ = value; }

MessageHandler* message_handler_ = nullptr;

Dart_NewNativePort

位于 runtime/vm/native_api_impl.cc:

DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
                                         Dart_NativeMessageHandler handler,
                                         bool handle_concurrently) {
  // ……

  // Start the native port without a current isolate.
  IsolateLeaveScope saver(Isolate::Current());

  NativeMessageHandler* nmh = new NativeMessageHandler(name, handler);
  Dart_Port port_id = PortMap::CreatePort(nmh);
  PortMap::SetPortState(port_id, PortMap::kLivePort);
  nmh->Run(Dart::thread_pool(), NULL, NULL, 0);
  return port_id;
}

PortMap::CreatePort

messageHandler 跟 Port 也有很深的关联:

Dart_Port PortMap::CreatePort(MessageHandler* handler) {
  // ……
  const Dart_Port port = AllocatePort();

  // The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
  // by the [PortMap::mutex_] we already hold.
  MessageHandler::PortSetEntry isolate_entry;
  isolate_entry.port = port;
  handler->ports_.Insert(isolate_entry);

  Entry entry;
  entry.port = port;
  entry.handler = handler;
  entry.state = kNewPort;
  ports_->Insert(entry);
  // ……
  return entry.port;
}

Run 运行消息处理

消息队列只是一个队列,真正在线程池上运行的是消息处理器(MessageHandler),来消费掉这个队列。

Run 方法的作用是,在线程上启动消息处理器(被封装成 MessageHandlerTask):

void MessageHandler::Run(ThreadPool* pool,
                         StartCallback start_callback,
                         EndCallback end_callback,
                         CallbackData data) {
  MonitorLocker ml(&monitor_);
  // ......
  pool_ = pool;
  start_callback_ = start_callback;
  end_callback_ = end_callback;
  callback_data_ = data;
  task_running_ = true;
  // 在线程池上运行任务
  const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
  ASSERT(launched_successfully);
}

调用处,都在哪里进行这个启动操作呢?

  • runtime/vm/kernel_isolate.cc 的 RunKernelTask 里,启动了 isolate 的 messageHanlder
    • KernelIsolate::Start
  • runtime/vm/native_api_impl.cc 的 Dart_NewNativePort 中,启动了 NativeMessageHandler 的 messageHandler

postMessage 发消息

通过 Port 机制,向消息循环发送消息,是通过该方法实现的。

void MessageHandler::PostMessage(std::unique_ptr<Message> message,
                                 bool before_events) {
  Message::Priority saved_priority;

  {
    MonitorLocker ml(&monitor_);
    if (FLAG_trace_isolates) {
      Isolate* source_isolate = Isolate::Current();
      // ......
    }

    saved_priority = message->priority();
    // 消息入队
    if (message->IsOOB()) {
      oob_queue_->Enqueue(std::move(message), before_events);
    } else {
      queue_->Enqueue(std::move(message), before_events);
    }
    if (paused_for_messages_) {
      ml.Notify();
    }

    // 如果消息队列没有在线程上运行,那么在线程池上运行当前任务队列
    if (pool_ != nullptr && !task_running_) {
      task_running_ = true;
      const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
    }
  }

  // Invoke any custom message notification.
  // 最后还有一个通知操作
  MessageNotify(saved_priority);
}

MessageNotify 消息通知

在基类中只给了一个空实现。在派生类中是怎么使用这个方法的?

在 IsolateMessageHandler 的 MessageNotify 中,加入了额外处理逻辑。可以看到,有一个 OOB 类型消息优先响应,以及触发 embedder 的回调。